import datetime
import enum
from copy import copy
import ipaddress
import itertools
import json
import logging
import math
import socket
from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
    NamedTuple, Type

import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec

from .utils import resolve_ip
from .migrations import queue_migrate_nfs_spec, queue_migrate_rgw_spec

if TYPE_CHECKING:
    from .module import CephadmOrchestrator


logger = logging.getLogger(__name__)

HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
AGENT_CACHE_PREFIX = 'agent.'


class HostCacheStatus(enum.Enum):
    stray = 'stray'
    host = 'host'
    devices = 'devices'


class Inventory:
    """
    The inventory stores a HostSpec for all hosts persistently.
    """

    def __init__(self, mgr: 'CephadmOrchestrator'):
        self.mgr = mgr
        adjusted_addrs = False

        def is_valid_ip(ip: str) -> bool:
            try:
                ipaddress.ip_address(ip)
                return True
            except ValueError:
                return False

        # load inventory
        i = self.mgr.get_store('inventory')
        if i:
            self._inventory: Dict[str, dict] = json.loads(i)
            # handle old clusters missing 'hostname' key from hostspec
            for k, v in self._inventory.items():
                if 'hostname' not in v:
                    v['hostname'] = k

                # convert legacy non-IP addr?
                if is_valid_ip(str(v.get('addr'))):
                    continue
                if len(self._inventory) > 1:
                    if k == socket.gethostname():
                        # Never try to resolve our own host!  This is
                        # fraught and can lead to either a loopback
                        # address (due to podman's futzing with
                        # /etc/hosts) or a private IP based on the CNI
                        # configuration.  Instead, wait until the mgr
                        # fails over to another host and let them resolve
                        # this host.
                        continue
                    ip = resolve_ip(cast(str, v.get('addr')))
                else:
                    # we only have 1 node in the cluster, so we can't
                    # rely on another host doing the lookup.  use the
                    # IP the mgr binds to.
                    ip = self.mgr.get_mgr_ip()
                if is_valid_ip(ip) and not ip.startswith('127.0.'):
                    self.mgr.log.info(
                        f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
                    )
                    v['addr'] = ip
                    adjusted_addrs = True
            if adjusted_addrs:
                self.save()
        else:
            self._inventory = dict()
        self._all_known_names: Dict[str, List[str]] = {}
        logger.debug('Loaded inventory %s' % self._inventory)

    def keys(self) -> List[str]:
        return list(self._inventory.keys())

    def __contains__(self, host: str) -> bool:
        return host in self._inventory or host in itertools.chain.from_iterable(self._all_known_names.values())

    def _get_stored_name(self, host: str) -> str:
        self.assert_host(host)
        if host in self._inventory:
            return host
        for stored_name, all_names in self._all_known_names.items():
            if host in all_names:
                return stored_name
        return host

    def update_known_hostnames(self, hostname: str, shortname: str, fqdn: str) -> None:
        for hname in [hostname, shortname, fqdn]:
            # if we know the host by any of the names, store the full set of names
            # in order to be able to check against those names for matching a host
            if hname in self._inventory:
                self._all_known_names[hname] = [hostname, shortname, fqdn]
                return
        logger.debug(f'got hostname set from gather-facts for unknown host: {[hostname, shortname, fqdn]}')

    def assert_host(self, host: str) -> None:
        if host not in self:
            raise OrchestratorError('host %s does not exist' % host)

    def add_host(self, spec: HostSpec) -> None:
        if spec.hostname in self:
            # addr
            if self.get_addr(spec.hostname) != spec.addr:
                self.set_addr(spec.hostname, spec.addr)
            # labels
            for label in spec.labels:
                self.add_label(spec.hostname, label)
        else:
            self._inventory[spec.hostname] = spec.to_json()
            self.save()

    def rm_host(self, host: str) -> None:
        host = self._get_stored_name(host)
        del self._inventory[host]
        self._all_known_names.pop(host, [])
        self.save()

    def set_addr(self, host: str, addr: str) -> None:
        host = self._get_stored_name(host)
        self._inventory[host]['addr'] = addr
        self.save()

    def add_label(self, host: str, label: str) -> None:
        host = self._get_stored_name(host)

        if 'labels' not in self._inventory[host]:
            self._inventory[host]['labels'] = list()
        if label not in self._inventory[host]['labels']:
            self._inventory[host]['labels'].append(label)
        self.save()

    def rm_label(self, host: str, label: str) -> None:
        host = self._get_stored_name(host)

        if 'labels' not in self._inventory[host]:
            self._inventory[host]['labels'] = list()
        if label in self._inventory[host]['labels']:
            self._inventory[host]['labels'].remove(label)
        self.save()

    def has_label(self, host: str, label: str) -> bool:
        host = self._get_stored_name(host)
        return (
            host in self._inventory
            and label in self._inventory[host].get('labels', [])
        )

    def get_addr(self, host: str) -> str:
        host = self._get_stored_name(host)
        return self._inventory[host].get('addr', host)

    def spec_from_dict(self, info: dict) -> HostSpec:
        hostname = info['hostname']
        hostname = self._get_stored_name(hostname)
        return HostSpec(
            hostname,
            addr=info.get('addr', hostname),
            labels=info.get('labels', []),
            status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
        )

    def all_specs(self) -> List[HostSpec]:
        return list(map(self.spec_from_dict, self._inventory.values()))

    def get_host_with_state(self, state: str = "") -> List[str]:
        """return a list of host names in a specific state"""
        return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]

    def save(self) -> None:
        self.mgr.set_store('inventory', json.dumps(self._inventory))


class SpecDescription(NamedTuple):
    spec: ServiceSpec
    rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
    created: datetime.datetime
    deleted: Optional[datetime.datetime]


class SpecStore():
    def __init__(self, mgr):
        # type: (CephadmOrchestrator) -> None
        self.mgr = mgr
        self._specs = {}  # type: Dict[str, ServiceSpec]
        # service_name -> rank -> gen -> daemon_id
        self._rank_maps = {}    # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
        self.spec_created = {}  # type: Dict[str, datetime.datetime]
        self.spec_deleted = {}  # type: Dict[str, datetime.datetime]
        self.spec_preview = {}  # type: Dict[str, ServiceSpec]
        self._needs_configuration: Dict[str, bool] = {}

    @property
    def all_specs(self) -> Mapping[str, ServiceSpec]:
        """
        returns active and deleted specs. Returns read-only dict.
        """
        return self._specs

    def __contains__(self, name: str) -> bool:
        return name in self._specs

    def __getitem__(self, name: str) -> SpecDescription:
        if name not in self._specs:
            raise OrchestratorError(f'Service {name} not found.')
        return SpecDescription(self._specs[name],
                               self._rank_maps.get(name),
                               self.spec_created[name],
                               self.spec_deleted.get(name, None))

    @property
    def active_specs(self) -> Mapping[str, ServiceSpec]:
        return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}

    def load(self):
        # type: () -> None
        for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
            service_name = k[len(SPEC_STORE_PREFIX):]
            try:
                j = cast(Dict[str, dict], json.loads(v))
                if (
                        (self.mgr.migration_current or 0) < 3
                        and j['spec'].get('service_type') == 'nfs'
                ):
                    self.mgr.log.debug(f'found legacy nfs spec {j}')
                    queue_migrate_nfs_spec(self.mgr, j)

                if (
                        (self.mgr.migration_current or 0) < 6
                        and j['spec'].get('service_type') == 'rgw'
                ):
                    queue_migrate_rgw_spec(self.mgr, j)

                spec = ServiceSpec.from_json(j['spec'])
                created = str_to_datetime(cast(str, j['created']))
                self._specs[service_name] = spec
                self.spec_created[service_name] = created

                if 'deleted' in j:
                    deleted = str_to_datetime(cast(str, j['deleted']))
                    self.spec_deleted[service_name] = deleted

                if 'needs_configuration' in j:
                    self._needs_configuration[service_name] = cast(bool, j['needs_configuration'])

                if 'rank_map' in j and isinstance(j['rank_map'], dict):
                    self._rank_maps[service_name] = {}
                    for rank_str, m in j['rank_map'].items():
                        try:
                            rank = int(rank_str)
                        except ValueError:
                            logger.exception(f"failed to parse rank in {j['rank_map']}")
                            continue
                        if isinstance(m, dict):
                            self._rank_maps[service_name][rank] = {}
                            for gen_str, name in m.items():
                                try:
                                    gen = int(gen_str)
                                except ValueError:
                                    logger.exception(f"failed to parse gen in {j['rank_map']}")
                                    continue
                                if isinstance(name, str) or m is None:
                                    self._rank_maps[service_name][rank][gen] = name

                self.mgr.log.debug('SpecStore: loaded spec for %s' % (
                    service_name))
            except Exception as e:
                self.mgr.log.warning('unable to load spec for %s: %s' % (
                    service_name, e))
                pass

    def save(
            self,
            spec: ServiceSpec,
            update_create: bool = True,
    ) -> None:
        name = spec.service_name()
        if spec.preview_only:
            self.spec_preview[name] = spec
            return None
        self._specs[name] = spec
        self._needs_configuration[name] = True

        if update_create:
            self.spec_created[name] = datetime_now()
        self._save(name)

    def save_rank_map(self,
                      name: str,
                      rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
        self._rank_maps[name] = rank_map
        self._save(name)

    def _save(self, name: str) -> None:
        data: Dict[str, Any] = {
            'spec': self._specs[name].to_json(),
        }
        if name in self.spec_created:
            data['created'] = datetime_to_str(self.spec_created[name])
        if name in self._rank_maps:
            data['rank_map'] = self._rank_maps[name]
        if name in self.spec_deleted:
            data['deleted'] = datetime_to_str(self.spec_deleted[name])
        if name in self._needs_configuration:
            data['needs_configuration'] = self._needs_configuration[name]

        self.mgr.set_store(
            SPEC_STORE_PREFIX + name,
            json.dumps(data, sort_keys=True),
        )
        self.mgr.events.for_service(self._specs[name],
                                    OrchestratorEvent.INFO,
                                    'service was created')

    def rm(self, service_name: str) -> bool:
        if service_name not in self._specs:
            return False

        if self._specs[service_name].preview_only:
            self.finally_rm(service_name)
            return True

        self.spec_deleted[service_name] = datetime_now()
        self.save(self._specs[service_name], update_create=False)
        return True

    def finally_rm(self, service_name):
        # type: (str) -> bool
        found = service_name in self._specs
        if found:
            del self._specs[service_name]
            if service_name in self._rank_maps:
                del self._rank_maps[service_name]
            del self.spec_created[service_name]
            if service_name in self.spec_deleted:
                del self.spec_deleted[service_name]
            if service_name in self._needs_configuration:
                del self._needs_configuration[service_name]
            self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
        return found

    def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
        return self.spec_created.get(spec.service_name())

    def set_unmanaged(self, service_name: str, value: bool) -> str:
        if service_name not in self._specs:
            return f'No service of name {service_name} found. Check "ceph orch ls" for all known services'
        if self._specs[service_name].unmanaged == value:
            return f'Service {service_name}{" already " if value else " not "}marked unmanaged. No action taken.'
        self._specs[service_name].unmanaged = value
        self.save(self._specs[service_name])
        return f'Set unmanaged to {str(value)} for service {service_name}'

    def needs_configuration(self, name: str) -> bool:
        return self._needs_configuration.get(name, False)

    def mark_needs_configuration(self, name: str) -> None:
        if name in self._specs:
            self._needs_configuration[name] = True
            self._save(name)
        else:
            self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as needing configuration')

    def mark_configured(self, name: str) -> None:
        if name in self._specs:
            self._needs_configuration[name] = False
            self._save(name)
        else:
            self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as having been configured')


class ClientKeyringSpec(object):
    """
    A client keyring file that we should maintain
    """

    def __init__(
            self,
            entity: str,
            placement: PlacementSpec,
            mode: Optional[int] = None,
            uid: Optional[int] = None,
            gid: Optional[int] = None,
    ) -> None:
        self.entity = entity
        self.placement = placement
        self.mode = mode or 0o600
        self.uid = uid or 0
        self.gid = gid or 0

    def validate(self) -> None:
        pass

    def to_json(self) -> Dict[str, Any]:
        return {
            'entity': self.entity,
            'placement': self.placement.to_json(),
            'mode': self.mode,
            'uid': self.uid,
            'gid': self.gid,
        }

    @property
    def path(self) -> str:
        return f'/etc/ceph/ceph.{self.entity}.keyring'

    @classmethod
    def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
        c = data.copy()
        if 'placement' in c:
            c['placement'] = PlacementSpec.from_json(c['placement'])
        _cls = cls(**c)
        _cls.validate()
        return _cls


class ClientKeyringStore():
    """
    Track client keyring files that we are supposed to maintain
    """

    def __init__(self, mgr):
        # type: (CephadmOrchestrator) -> None
        self.mgr: CephadmOrchestrator = mgr
        self.mgr = mgr
        self.keys: Dict[str, ClientKeyringSpec] = {}

    def load(self) -> None:
        c = self.mgr.get_store('client_keyrings') or b'{}'
        j = json.loads(c)
        for e, d in j.items():
            self.keys[e] = ClientKeyringSpec.from_json(d)

    def save(self) -> None:
        data = {
            k: v.to_json() for k, v in self.keys.items()
        }
        self.mgr.set_store('client_keyrings', json.dumps(data))

    def update(self, ks: ClientKeyringSpec) -> None:
        self.keys[ks.entity] = ks
        self.save()

    def rm(self, entity: str) -> None:
        if entity in self.keys:
            del self.keys[entity]
            self.save()


class TunedProfileStore():
    """
    Store for out tuned profile information
    """

    def __init__(self, mgr: "CephadmOrchestrator") -> None:
        self.mgr: CephadmOrchestrator = mgr
        self.mgr = mgr
        self.profiles: Dict[str, TunedProfileSpec] = {}

    def __contains__(self, profile: str) -> bool:
        return profile in self.profiles

    def load(self) -> None:
        c = self.mgr.get_store('tuned_profiles') or b'{}'
        j = json.loads(c)
        for k, v in j.items():
            self.profiles[k] = TunedProfileSpec.from_json(v)
            self.profiles[k]._last_updated = datetime_to_str(datetime_now())

    def exists(self, profile_name: str) -> bool:
        return profile_name in self.profiles

    def save(self) -> None:
        profiles_json = {k: v.to_json() for k, v in self.profiles.items()}
        self.mgr.set_store('tuned_profiles', json.dumps(profiles_json))

    def add_setting(self, profile: str, setting: str, value: str) -> None:
        if profile in self.profiles:
            self.profiles[profile].settings[setting] = value
            self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
            self.save()
        else:
            logger.error(
                f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')

    def rm_setting(self, profile: str, setting: str) -> None:
        if profile in self.profiles:
            if setting in self.profiles[profile].settings:
                self.profiles[profile].settings.pop(setting, '')
                self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
                self.save()
            else:
                logger.error(
                    f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
        else:
            logger.error(
                f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')

    def add_profile(self, spec: TunedProfileSpec) -> None:
        spec._last_updated = datetime_to_str(datetime_now())
        self.profiles[spec.profile_name] = spec
        self.save()

    def rm_profile(self, profile: str) -> None:
        if profile in self.profiles:
            self.profiles.pop(profile, TunedProfileSpec(''))
        else:
            logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"')
        self.save()

    def last_updated(self, profile: str) -> Optional[datetime.datetime]:
        if profile not in self.profiles or not self.profiles[profile]._last_updated:
            return None
        return str_to_datetime(self.profiles[profile]._last_updated)

    def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None:
        if profile in self.profiles:
            self.profiles[profile]._last_updated = datetime_to_str(new_datetime)

    def list_profiles(self) -> List[TunedProfileSpec]:
        return [p for p in self.profiles.values()]


class HostCache():
    """
    HostCache stores different things:

    1. `daemons`: Deployed daemons O(daemons)

    They're part of the configuration nowadays and need to be
    persistent. The name "daemon cache" is unfortunately a bit misleading.
    Like for example we really need to know where daemons are deployed on
    hosts that are offline.

    2. `devices`: ceph-volume inventory cache O(hosts)

    As soon as this is populated, it becomes more or less read-only.

    3. `networks`: network interfaces for each host. O(hosts)

    This is needed in order to deploy MONs. As this is mostly read-only.

    4. `last_client_files` O(hosts)

    Stores the last digest and owner/mode for files we've pushed to /etc/ceph
    (ceph.conf or client keyrings).

    5. `scheduled_daemon_actions`: O(daemons)

    Used to run daemon actions after deploying a daemon. We need to
    store it persistently, in order to stay consistent across
    MGR failovers.
    """

    def __init__(self, mgr):
        # type: (CephadmOrchestrator) -> None
        self.mgr: CephadmOrchestrator = mgr
        self.daemons = {}   # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
        self.last_daemon_update = {}   # type: Dict[str, datetime.datetime]
        self.devices = {}              # type: Dict[str, List[inventory.Device]]
        self.facts = {}                # type: Dict[str, Dict[str, Any]]
        self.last_facts_update = {}    # type: Dict[str, datetime.datetime]
        self.last_autotune = {}        # type: Dict[str, datetime.datetime]
        self.osdspec_previews = {}     # type: Dict[str, List[Dict[str, Any]]]
        self.osdspec_last_applied = {}  # type: Dict[str, Dict[str, datetime.datetime]]
        self.networks = {}             # type: Dict[str, Dict[str, Dict[str, List[str]]]]
        self.last_network_update = {}   # type: Dict[str, datetime.datetime]
        self.last_device_update = {}   # type: Dict[str, datetime.datetime]
        self.last_device_change = {}   # type: Dict[str, datetime.datetime]
        self.last_tuned_profile_update = {}  # type: Dict[str, datetime.datetime]
        self.daemon_refresh_queue = []  # type: List[str]
        self.device_refresh_queue = []  # type: List[str]
        self.network_refresh_queue = []  # type: List[str]
        self.osdspec_previews_refresh_queue = []  # type: List[str]

        # host -> daemon name -> dict
        self.daemon_config_deps = {}   # type: Dict[str, Dict[str, Dict[str,Any]]]
        self.last_host_check = {}      # type: Dict[str, datetime.datetime]
        self.loading_osdspec_preview = set()  # type: Set[str]
        self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
        self.registry_login_queue: Set[str] = set()

        self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}

        self.metadata_up_to_date = {}  # type: Dict[str, bool]

    def load(self):
        # type: () -> None
        for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
            host = k[len(HOST_CACHE_PREFIX):]
            if self._get_host_cache_entry_status(host) != HostCacheStatus.host:
                if self._get_host_cache_entry_status(host) == HostCacheStatus.devices:
                    continue
                self.mgr.log.warning('removing stray HostCache host record %s' % (
                    host))
                self.mgr.set_store(k, None)
            try:
                j = json.loads(v)
                if 'last_device_update' in j:
                    self.last_device_update[host] = str_to_datetime(j['last_device_update'])
                else:
                    self.device_refresh_queue.append(host)
                if 'last_device_change' in j:
                    self.last_device_change[host] = str_to_datetime(j['last_device_change'])
                # for services, we ignore the persisted last_*_update
                # and always trigger a new scrape on mgr restart.
                self.daemon_refresh_queue.append(host)
                self.network_refresh_queue.append(host)
                self.daemons[host] = {}
                self.osdspec_previews[host] = []
                self.osdspec_last_applied[host] = {}
                self.networks[host] = {}
                self.daemon_config_deps[host] = {}
                for name, d in j.get('daemons', {}).items():
                    self.daemons[host][name] = \
                        orchestrator.DaemonDescription.from_json(d)
                self.devices[host] = []
                # still want to check old device location for upgrade scenarios
                for d in j.get('devices', []):
                    self.devices[host].append(inventory.Device.from_json(d))
                self.devices[host] += self.load_host_devices(host)
                self.networks[host] = j.get('networks_and_interfaces', {})
                self.osdspec_previews[host] = j.get('osdspec_previews', {})
                self.last_client_files[host] = j.get('last_client_files', {})
                for name, ts in j.get('osdspec_last_applied', {}).items():
                    self.osdspec_last_applied[host][name] = str_to_datetime(ts)

                for name, d in j.get('daemon_config_deps', {}).items():
                    self.daemon_config_deps[host][name] = {
                        'deps': d.get('deps', []),
                        'last_config': str_to_datetime(d['last_config']),
                    }
                if 'last_host_check' in j:
                    self.last_host_check[host] = str_to_datetime(j['last_host_check'])
                if 'last_tuned_profile_update' in j:
                    self.last_tuned_profile_update[host] = str_to_datetime(
                        j['last_tuned_profile_update'])
                self.registry_login_queue.add(host)
                self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
                self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)

                self.mgr.log.debug(
                    'HostCache.load: host %s has %d daemons, '
                    '%d devices, %d networks' % (
                        host, len(self.daemons[host]), len(self.devices[host]),
                        len(self.networks[host])))
            except Exception as e:
                self.mgr.log.warning('unable to load cached state for %s: %s' % (
                    host, e))
                pass

    def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus:
        # return whether a host cache entry in the config-key
        # store is for a host, a set of devices or is stray.
        # for a host, the entry name will match a hostname in our
        # inventory. For devices, it will be formatted
        # <hostname>.devices.<integer> where <hostname> is
        # in out inventory. If neither case applies, it is stray
        if host in self.mgr.inventory:
            return HostCacheStatus.host
        try:
            # try stripping off the ".devices.<integer>" and see if we get
            # a host name that matches our inventory
            actual_host = '.'.join(host.split('.')[:-2])
            return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray
        except Exception:
            return HostCacheStatus.stray

    def update_host_daemons(self, host, dm):
        # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
        self.daemons[host] = dm
        self.last_daemon_update[host] = datetime_now()

    def update_host_facts(self, host, facts):
        # type: (str, Dict[str, Dict[str, Any]]) -> None
        self.facts[host] = facts
        hostnames: List[str] = []
        for k in ['hostname', 'shortname', 'fqdn']:
            v = facts.get(k, '')
            hostnames.append(v if isinstance(v, str) else '')
        self.mgr.inventory.update_known_hostnames(hostnames[0], hostnames[1], hostnames[2])
        self.last_facts_update[host] = datetime_now()

    def update_autotune(self, host: str) -> None:
        self.last_autotune[host] = datetime_now()

    def invalidate_autotune(self, host: str) -> None:
        if host in self.last_autotune:
            del self.last_autotune[host]

    def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
        old_devs = inventory.Devices(self.devices[host])
        new_devs = inventory.Devices(b)
        # relying on Devices class __eq__ function here
        if old_devs != new_devs:
            self.mgr.log.info("Detected new or changed devices on %s" % host)
            return True
        return False

    def update_host_devices(
            self,
            host: str,
            dls: List[inventory.Device],
    ) -> None:
        if (
                host not in self.devices
                or host not in self.last_device_change
                or self.devices_changed(host, dls)
        ):
            self.last_device_change[host] = datetime_now()
        self.last_device_update[host] = datetime_now()
        self.devices[host] = dls

    def update_host_networks(
            self,
            host: str,
            nets: Dict[str, Dict[str, List[str]]]
    ) -> None:
        self.networks[host] = nets
        self.last_network_update[host] = datetime_now()

    def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
        self.daemon_config_deps[host][name] = {
            'deps': deps,
            'last_config': stamp,
        }

    def update_last_host_check(self, host):
        # type: (str) -> None
        self.last_host_check[host] = datetime_now()

    def update_osdspec_last_applied(self, host, service_name, ts):
        # type: (str, str, datetime.datetime) -> None
        self.osdspec_last_applied[host][service_name] = ts

    def update_client_file(self,
                           host: str,
                           path: str,
                           digest: str,
                           mode: int,
                           uid: int,
                           gid: int) -> None:
        if host not in self.last_client_files:
            self.last_client_files[host] = {}
        self.last_client_files[host][path] = (digest, mode, uid, gid)

    def removed_client_file(self, host: str, path: str) -> None:
        if (
            host in self.last_client_files
            and path in self.last_client_files[host]
        ):
            del self.last_client_files[host][path]

    def prime_empty_host(self, host):
        # type: (str) -> None
        """
        Install an empty entry for a host
        """
        self.daemons[host] = {}
        self.devices[host] = []
        self.networks[host] = {}
        self.osdspec_previews[host] = []
        self.osdspec_last_applied[host] = {}
        self.daemon_config_deps[host] = {}
        self.daemon_refresh_queue.append(host)
        self.device_refresh_queue.append(host)
        self.network_refresh_queue.append(host)
        self.osdspec_previews_refresh_queue.append(host)
        self.registry_login_queue.add(host)
        self.last_client_files[host] = {}

    def refresh_all_host_info(self, host):
        # type: (str) -> None

        self.last_host_check.pop(host, None)
        self.daemon_refresh_queue.append(host)
        self.registry_login_queue.add(host)
        self.device_refresh_queue.append(host)
        self.last_facts_update.pop(host, None)
        self.osdspec_previews_refresh_queue.append(host)
        self.last_autotune.pop(host, None)

    def invalidate_host_daemons(self, host):
        # type: (str) -> None
        self.daemon_refresh_queue.append(host)
        if host in self.last_daemon_update:
            del self.last_daemon_update[host]
        self.mgr.event.set()

    def invalidate_host_devices(self, host):
        # type: (str) -> None
        self.device_refresh_queue.append(host)
        if host in self.last_device_update:
            del self.last_device_update[host]
        self.mgr.event.set()

    def invalidate_host_networks(self, host):
        # type: (str) -> None
        self.network_refresh_queue.append(host)
        if host in self.last_network_update:
            del self.last_network_update[host]
        self.mgr.event.set()

    def distribute_new_registry_login_info(self) -> None:
        self.registry_login_queue = set(self.mgr.inventory.keys())

    def save_host(self, host: str) -> None:
        j: Dict[str, Any] = {
            'daemons': {},
            'devices': [],
            'osdspec_previews': [],
            'osdspec_last_applied': {},
            'daemon_config_deps': {},
        }
        if host in self.last_daemon_update:
            j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
        if host in self.last_device_update:
            j['last_device_update'] = datetime_to_str(self.last_device_update[host])
        if host in self.last_network_update:
            j['last_network_update'] = datetime_to_str(self.last_network_update[host])
        if host in self.last_device_change:
            j['last_device_change'] = datetime_to_str(self.last_device_change[host])
        if host in self.last_tuned_profile_update:
            j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host])
        if host in self.daemons:
            for name, dd in self.daemons[host].items():
                j['daemons'][name] = dd.to_json()
        if host in self.networks:
            j['networks_and_interfaces'] = self.networks[host]
        if host in self.daemon_config_deps:
            for name, depi in self.daemon_config_deps[host].items():
                j['daemon_config_deps'][name] = {
                    'deps': depi.get('deps', []),
                    'last_config': datetime_to_str(depi['last_config']),
                }
        if host in self.osdspec_previews and self.osdspec_previews[host]:
            j['osdspec_previews'] = self.osdspec_previews[host]
        if host in self.osdspec_last_applied:
            for name, ts in self.osdspec_last_applied[host].items():
                j['osdspec_last_applied'][name] = datetime_to_str(ts)

        if host in self.last_host_check:
            j['last_host_check'] = datetime_to_str(self.last_host_check[host])

        if host in self.last_client_files:
            j['last_client_files'] = self.last_client_files[host]
        if host in self.scheduled_daemon_actions:
            j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
        if host in self.metadata_up_to_date:
            j['metadata_up_to_date'] = self.metadata_up_to_date[host]
        if host in self.devices:
            self.save_host_devices(host)

        self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))

    def save_host_devices(self, host: str) -> None:
        if host not in self.devices or not self.devices[host]:
            logger.debug(f'Host {host} has no devices to save')
            return

        devs: List[Dict[str, Any]] = []
        for d in self.devices[host]:
            devs.append(d.to_json())

        def byte_len(s: str) -> int:
            return len(s.encode('utf-8'))

        dev_cache_counter: int = 0
        cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
        if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024:
            # no guarantee all device entries take up the same amount of space
            # splitting it up so there's one more entry than we need should be fairly
            # safe and save a lot of extra logic checking sizes
            cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1
            dev_sublist_size = math.ceil(len(devs) / cache_entries_needed)
            dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size]
                                                     for i in range(0, len(devs), dev_sublist_size)]
            for dev_list in dev_lists:
                dev_dict: Dict[str, Any] = {'devices': dev_list}
                if dev_cache_counter == 0:
                    dev_dict.update({'entries': len(dev_lists)})
                self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
                                   + str(dev_cache_counter), json.dumps(dev_dict))
                dev_cache_counter += 1
        else:
            self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
                               + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1}))

    def load_host_devices(self, host: str) -> List[inventory.Device]:
        dev_cache_counter: int = 0
        devs: List[Dict[str, Any]] = []
        dev_entries: int = 0
        try:
            # number of entries for the host's devices should be in
            # the "entries" field of the first entry
            dev_entries = json.loads(self.mgr.get_store(
                HOST_CACHE_PREFIX + host + '.devices.0')).get('entries')
        except Exception:
            logger.debug(f'No device entries found for host {host}')
        for i in range(dev_entries):
            try:
                new_devs = json.loads(self.mgr.get_store(
                    HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', [])
                if len(new_devs) > 0:
                    # verify list contains actual device objects by trying to load one from json
                    inventory.Device.from_json(new_devs[0])
                    # if we didn't throw an Exception on above line, we can add the devices
                    devs = devs + new_devs
                    dev_cache_counter += 1
            except Exception as e:
                logger.error(('Hit exception trying to load devices from '
                             + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
                return []
        return [inventory.Device.from_json(d) for d in devs]

    def rm_host(self, host):
        # type: (str) -> None
        if host in self.daemons:
            del self.daemons[host]
        if host in self.devices:
            del self.devices[host]
        if host in self.facts:
            del self.facts[host]
        if host in self.last_facts_update:
            del self.last_facts_update[host]
        if host in self.last_autotune:
            del self.last_autotune[host]
        if host in self.osdspec_previews:
            del self.osdspec_previews[host]
        if host in self.osdspec_last_applied:
            del self.osdspec_last_applied[host]
        if host in self.loading_osdspec_preview:
            self.loading_osdspec_preview.remove(host)
        if host in self.networks:
            del self.networks[host]
        if host in self.last_daemon_update:
            del self.last_daemon_update[host]
        if host in self.last_device_update:
            del self.last_device_update[host]
        if host in self.last_network_update:
            del self.last_network_update[host]
        if host in self.last_device_change:
            del self.last_device_change[host]
        if host in self.last_tuned_profile_update:
            del self.last_tuned_profile_update[host]
        if host in self.daemon_config_deps:
            del self.daemon_config_deps[host]
        if host in self.scheduled_daemon_actions:
            del self.scheduled_daemon_actions[host]
        if host in self.last_client_files:
            del self.last_client_files[host]
        self.mgr.set_store(HOST_CACHE_PREFIX + host, None)

    def get_hosts(self):
        # type: () -> List[str]
        return list(self.daemons)

    def get_schedulable_hosts(self) -> List[HostSpec]:
        """
        Returns all usable hosts that went through _refresh_host_daemons().

        This mitigates a potential race, where new host was added *after*
        ``_refresh_host_daemons()`` was called, but *before*
        ``_apply_all_specs()`` was called. thus we end up with a hosts
        where daemons might be running, but we have not yet detected them.
        """
        return [
            h for h in self.mgr.inventory.all_specs()
            if (
                self.host_had_daemon_refresh(h.hostname)
                and '_no_schedule' not in h.labels
            )
        ]

    def get_non_draining_hosts(self) -> List[HostSpec]:
        """
        Returns all hosts that do not have _no_schedule label.

        Useful for the agent who needs this specific list rather than the
        schedulable_hosts since the agent needs to be deployed on hosts with
        no daemon refresh
        """
        return [
            h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
        ]

    def get_draining_hosts(self) -> List[HostSpec]:
        """
        Returns all hosts that have _no_schedule label and therefore should have
        no daemons placed on them, but are potentially still reachable
        """
        return [
            h for h in self.mgr.inventory.all_specs() if '_no_schedule' in h.labels
        ]

    def get_unreachable_hosts(self) -> List[HostSpec]:
        """
        Return all hosts that are offline or in maintenance mode.

        The idea is we should not touch the daemons on these hosts (since
        in theory the hosts are inaccessible so we CAN'T touch them) but
        we still want to count daemons that exist on these hosts toward the
        placement so daemons on these hosts aren't just moved elsewhere
        """
        return [
            h for h in self.mgr.inventory.all_specs()
            if (
                h.status.lower() in ['maintenance', 'offline']
                or h.hostname in self.mgr.offline_hosts
            )
        ]

    def get_facts(self, host: str) -> Dict[str, Any]:
        return self.facts.get(host, {})

    def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
        for dm in self.daemons.copy().values():
            yield from dm.values()

    def get_daemons(self):
        # type: () -> List[orchestrator.DaemonDescription]
        return list(self._get_daemons())

    def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
        r = []
        for dd in self._get_daemons():
            if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
                r.append(dd)
        return r

    def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
        return list(self.daemons.get(host, {}).values())

    def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
        assert not daemon_name.startswith('ha-rgw.')
        dds = self.get_daemons_by_host(host) if host else self._get_daemons()
        for dd in dds:
            if dd.name() == daemon_name:
                return dd

        raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')

    def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
        try:
            self.get_daemon(daemon_name, host)
        except orchestrator.OrchestratorError:
            return False
        return True

    def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
        def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
            dd = copy(dd_orig)
            if host in self.mgr.offline_hosts:
                dd.status = orchestrator.DaemonDescriptionStatus.error
                dd.status_desc = 'host is offline'
            elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
                # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
                # could be wrong. We must assume maintenance is working and daemons are stopped
                dd.status = orchestrator.DaemonDescriptionStatus.stopped
            dd.events = self.mgr.events.get_for_daemon(dd.name())
            return dd

        for host, dm in self.daemons.copy().items():
            yield host, {name: alter(host, d) for name, d in dm.items()}

    def get_daemons_by_service(self, service_name):
        # type: (str) -> List[orchestrator.DaemonDescription]
        assert not service_name.startswith('keepalived.')
        assert not service_name.startswith('haproxy.')

        return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)

    def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
        assert service_type not in ['keepalived', 'haproxy']

        daemons = self.daemons[host].values() if host else self._get_daemons()

        return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]

    def get_daemon_types(self, hostname: str) -> Set[str]:
        """Provide a list of the types of daemons on the host"""
        return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})

    def get_daemon_names(self):
        # type: () -> List[str]
        return [d.name() for d in self._get_daemons()]

    def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
        if host in self.daemon_config_deps:
            if name in self.daemon_config_deps[host]:
                return self.daemon_config_deps[host][name].get('deps', []), \
                    self.daemon_config_deps[host][name].get('last_config', None)
        return None, None

    def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
        return self.last_client_files.get(host, {})

    def host_needs_daemon_refresh(self, host):
        # type: (str) -> bool
        if host in self.mgr.offline_hosts:
            logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
            return False
        if host in self.daemon_refresh_queue:
            self.daemon_refresh_queue.remove(host)
            return True
        cutoff = datetime_now() - datetime.timedelta(
            seconds=self.mgr.daemon_cache_timeout)
        if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
            return True
        if not self.mgr.cache.host_metadata_up_to_date(host):
            return True
        return False

    def host_needs_facts_refresh(self, host):
        # type: (str) -> bool
        if host in self.mgr.offline_hosts:
            logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
            return False
        cutoff = datetime_now() - datetime.timedelta(
            seconds=self.mgr.facts_cache_timeout)
        if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
            return True
        if not self.mgr.cache.host_metadata_up_to_date(host):
            return True
        return False

    def host_needs_autotune_memory(self, host):
        # type: (str) -> bool
        if host in self.mgr.offline_hosts:
            logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
            return False
        cutoff = datetime_now() - datetime.timedelta(
            seconds=self.mgr.autotune_interval)
        if host not in self.last_autotune or self.last_autotune[host] < cutoff:
            return True
        return False

    def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool:
        if host in self.mgr.offline_hosts:
            logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile')
            return False
        if profile not in self.mgr.tuned_profiles:
            logger.debug(
                f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
            return False
        if host not in self.last_tuned_profile_update:
            return True
        last_profile_update = self.mgr.tuned_profiles.last_updated(profile)
        if last_profile_update is None:
            self.mgr.tuned_profiles.set_last_updated(profile, datetime_now())
            return True
        if self.last_tuned_profile_update[host] < last_profile_update:
            return True
        return False

    def host_had_daemon_refresh(self, host: str) -> bool:
        """
        ... at least once.
        """
        if host in self.last_daemon_update:
            return True
        if host not in self.daemons:
            return False
        return bool(self.daemons[host])

    def host_needs_device_refresh(self, host):
        # type: (str) -> bool
        if host in self.mgr.offline_hosts:
            logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
            return False
        if host in self.device_refresh_queue:
            self.device_refresh_queue.remove(host)
            return True
        cutoff = datetime_now() - datetime.timedelta(
            seconds=self.mgr.device_cache_timeout)
        if host not in self.last_device_update or self.last_device_update[host] < cutoff:
            return True
        if not self.mgr.cache.host_metadata_up_to_date(host):
            return True
        return False

    def host_needs_network_refresh(self, host):
        # type: (str) -> bool
        if host in self.mgr.offline_hosts:
            logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
            return False
        if host in self.network_refresh_queue:
            self.network_refresh_queue.remove(host)
            return True
        cutoff = datetime_now() - datetime.timedelta(
            seconds=self.mgr.device_cache_timeout)
        if host not in self.last_network_update or self.last_network_update[host] < cutoff:
            return True
        if not self.mgr.cache.host_metadata_up_to_date(host):
            return True
        return False

    def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
        if host in self.mgr.offline_hosts:
            logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
            return False
        if host in self.osdspec_previews_refresh_queue:
            self.osdspec_previews_refresh_queue.remove(host)
            return True
        #  Since this is dependent on other factors (device and spec) this does not  need
        #  to be updated periodically.
        return False

    def host_needs_check(self, host):
        # type: (str) -> bool
        cutoff = datetime_now() - datetime.timedelta(
            seconds=self.mgr.host_check_interval)
        return host not in self.last_host_check or self.last_host_check[host] < cutoff

    def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
        if (
            host not in self.devices
            or host not in self.last_device_change
            or host not in self.last_device_update
            or host not in self.osdspec_last_applied
            or spec.service_name() not in self.osdspec_last_applied[host]
        ):
            return True
        created = self.mgr.spec_store.get_created(spec)
        if not created or created > self.last_device_change[host]:
            return True
        return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]

    def host_needs_registry_login(self, host: str) -> bool:
        if host in self.mgr.offline_hosts:
            return False
        if host in self.registry_login_queue:
            self.registry_login_queue.remove(host)
            return True
        return False

    def host_metadata_up_to_date(self, host: str) -> bool:
        if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
            return False
        return True

    def all_host_metadata_up_to_date(self) -> bool:
        unreachables = [h.hostname for h in self.get_unreachable_hosts()]
        if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
            # this function is primarily for telling if it's safe to try and apply a service
            # spec. Since offline/maintenance hosts aren't considered in that process anyway
            # we don't want to return False if the host without up-to-date metadata is in one
            # of those two categories.
            return False
        return True

    def add_daemon(self, host, dd):
        # type: (str, orchestrator.DaemonDescription) -> None
        assert host in self.daemons
        self.daemons[host][dd.name()] = dd

    def rm_daemon(self, host: str, name: str) -> None:
        assert not name.startswith('ha-rgw.')

        if host in self.daemons:
            if name in self.daemons[host]:
                del self.daemons[host][name]

    def daemon_cache_filled(self) -> bool:
        """
        i.e. we have checked the daemons for each hosts at least once.
        excluding offline hosts.

        We're not checking for `host_needs_daemon_refresh`, as this might never be
        False for all hosts.
        """
        return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
                   for h in self.get_hosts())

    def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
        assert not daemon_name.startswith('ha-rgw.')

        priorities = {
            'start': 1,
            'restart': 2,
            'reconfig': 3,
            'redeploy': 4,
            'stop': 5,
            'rotate-key': 6,
        }
        existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
        if existing_action and priorities[existing_action] > priorities[action]:
            logger.debug(
                f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
            return

        if host not in self.scheduled_daemon_actions:
            self.scheduled_daemon_actions[host] = {}
        self.scheduled_daemon_actions[host][daemon_name] = action

    def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
        found = False
        if host in self.scheduled_daemon_actions:
            if daemon_name in self.scheduled_daemon_actions[host]:
                del self.scheduled_daemon_actions[host][daemon_name]
                found = True
            if not self.scheduled_daemon_actions[host]:
                del self.scheduled_daemon_actions[host]
        return found

    def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
        assert not daemon.startswith('ha-rgw.')

        return self.scheduled_daemon_actions.get(host, {}).get(daemon)


class AgentCache():
    """
    AgentCache is used for storing metadata about agent daemons that must be kept
    through MGR failovers
    """

    def __init__(self, mgr):
        # type: (CephadmOrchestrator) -> None
        self.mgr: CephadmOrchestrator = mgr
        self.agent_config_deps = {}   # type: Dict[str, Dict[str,Any]]
        self.agent_counter = {}  # type: Dict[str, int]
        self.agent_timestamp = {}  # type: Dict[str, datetime.datetime]
        self.agent_keys = {}  # type: Dict[str, str]
        self.agent_ports = {}  # type: Dict[str, int]
        self.sending_agent_message = {}  # type: Dict[str, bool]

    def load(self):
        # type: () -> None
        for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
            host = k[len(AGENT_CACHE_PREFIX):]
            if host not in self.mgr.inventory:
                self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
                    host))
                self.mgr.set_store(k, None)
            try:
                j = json.loads(v)
                self.agent_config_deps[host] = {}
                conf_deps = j.get('agent_config_deps', {})
                if conf_deps:
                    conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
                self.agent_config_deps[host] = conf_deps
                self.agent_counter[host] = int(j.get('agent_counter', 1))
                self.agent_timestamp[host] = str_to_datetime(
                    j.get('agent_timestamp', datetime_to_str(datetime_now())))
                self.agent_keys[host] = str(j.get('agent_keys', ''))
                agent_port = int(j.get('agent_ports', 0))
                if agent_port:
                    self.agent_ports[host] = agent_port

            except Exception as e:
                self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
                    host, e))
                pass

    def save_agent(self, host: str) -> None:
        j: Dict[str, Any] = {}
        if host in self.agent_config_deps:
            j['agent_config_deps'] = {
                'deps': self.agent_config_deps[host].get('deps', []),
                'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
            }
        if host in self.agent_counter:
            j['agent_counter'] = self.agent_counter[host]
        if host in self.agent_keys:
            j['agent_keys'] = self.agent_keys[host]
        if host in self.agent_ports:
            j['agent_ports'] = self.agent_ports[host]
        if host in self.agent_timestamp:
            j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])

        self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))

    def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
        self.agent_config_deps[host] = {
            'deps': deps,
            'last_config': stamp,
        }

    def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
        if host in self.agent_config_deps:
            return self.agent_config_deps[host].get('deps', []), \
                self.agent_config_deps[host].get('last_config', None)
        return None, None

    def messaging_agent(self, host: str) -> bool:
        if host not in self.sending_agent_message or not self.sending_agent_message[host]:
            return False
        return True

    def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
        # agent successfully received new config. Update config/deps
        assert daemon_spec.service_name == 'agent'
        self.update_agent_config_deps(
            daemon_spec.host, daemon_spec.deps, datetime_now())
        self.agent_timestamp[daemon_spec.host] = datetime_now()
        self.agent_counter[daemon_spec.host] = 1
        self.save_agent(daemon_spec.host)


class EventStore():
    def __init__(self, mgr):
        # type: (CephadmOrchestrator) -> None
        self.mgr: CephadmOrchestrator = mgr
        self.events = {}  # type: Dict[str, List[OrchestratorEvent]]

    def add(self, event: OrchestratorEvent) -> None:

        if event.kind_subject() not in self.events:
            self.events[event.kind_subject()] = [event]

        for e in self.events[event.kind_subject()]:
            if e.message == event.message:
                return

        self.events[event.kind_subject()].append(event)

        # limit to five events for now.
        self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]

    def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
        e = OrchestratorEvent(datetime_now(), 'service',
                              spec.service_name(), level, message)
        self.add(e)

    def from_orch_error(self, e: OrchestratorError) -> None:
        if e.event_subject is not None:
            self.add(OrchestratorEvent(
                datetime_now(),
                e.event_subject[0],
                e.event_subject[1],
                "ERROR",
                str(e)
            ))

    def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
        e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
        self.add(e)

    def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
        self.for_daemon(
            daemon_name,
            "ERROR",
            str(e)
        )

    def cleanup(self) -> None:
        # Needs to be properly done, in case events are persistently stored.

        unknowns: List[str] = []
        daemons = self.mgr.cache.get_daemon_names()
        specs = self.mgr.spec_store.all_specs.keys()
        for k_s, v in self.events.items():
            kind, subject = k_s.split(':')
            if kind == 'service':
                if subject not in specs:
                    unknowns.append(k_s)
            elif kind == 'daemon':
                if subject not in daemons:
                    unknowns.append(k_s)

        for k_s in unknowns:
            del self.events[k_s]

    def get_for_service(self, name: str) -> List[OrchestratorEvent]:
        return self.events.get('service:' + name, [])

    def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
        return self.events.get('daemon:' + name, [])
